package org.zjvis.datascience.service;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.zjvis.datascience.common.constant.Constant;
import org.zjvis.datascience.common.dto.PipelineSnapshotDTO;
import org.zjvis.datascience.common.dto.TaskDTO;
import org.zjvis.datascience.common.dto.TaskInstanceDTO;
import org.zjvis.datascience.common.dto.TaskInstanceSnapshotDTO;
import org.zjvis.datascience.common.dto.TaskSnapshotDTO;
import org.zjvis.datascience.common.enums.TaskTypeEnum;
import org.zjvis.datascience.common.exception.BaseErrorCode;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.util.DozerUtil;
import org.zjvis.datascience.common.util.FileUtil;
import org.zjvis.datascience.common.util.JwtUtil;
import org.zjvis.datascience.common.util.RedisUtil;
import org.zjvis.datascience.common.vo.PipelineSnapshotQueryVO;
import org.zjvis.datascience.common.vo.PipelineSnapshotVO;
import org.zjvis.datascience.service.mapper.PipelineSnapshotMapper;
import org.zjvis.datascience.service.mapper.TaskInstanceMapper;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.zjvis.datascience.service.mapper.TaskMapper;

/**
 * @description PipelineSnapshot 数据视图快照 Service
 * @date 2021-10-18
 */
@Service
public class PipelineSnapshotService {
    private final static Logger logger = LoggerFactory.getLogger(PipelineSnapshotService.class);


    @Autowired
    private TaskInstanceSnapshotService taskInstanceSnapshotService;

    @Autowired
    private PipelineSnapshotMapper pipelineSnapshotMapper;

    @Autowired
    private TaskSnapshotService taskSnapshotService;

    @Autowired
    private TaskInstanceMapper taskInstanceMapper;

    @Autowired
    private MinioService minioService;

    @Lazy
    @Autowired
    private TaskService taskService;

    @Autowired
    private RedisUtil redisUtil;

    @Autowired
    private TaskMapper taskMapper;

    /**
     * 保存快照
     * @param vo
     * @return
     * @throws Exception
     */
    @Transactional(rollbackFor = Exception.class)
    public Boolean saveSnapshot(PipelineSnapshotVO vo) throws Exception {
        String key = "pipeline_"+vo.getPipelineId()+"_snapshot";
        //获取锁
        if (redisUtil.getLock(key,vo.getPipelineId().toString(),120L)) {
            try {
                //检查快照名
                if (nameExist(vo.getPipelineId(),vo.getPipelineSnapshotName())) {
                    throw new DataScienceException(BaseErrorCode.PIPELINE_SNAPSHOT_NAME_EXIST);
                }

                //获取pipeline下所有节点
                List<TaskDTO> tasks = taskMapper.queryByPipeline(vo.getPipelineId());
                List<String> taskStrings = new ArrayList<>();
                List<Long> cleanTypeTaskIds = new ArrayList<>();
                List<String> taskInstanceStrings = new ArrayList<>();
                if (tasks!=null&&!tasks.isEmpty()) {
                    //task信息转json字符串
                    for (TaskDTO task:tasks) {
                        //检查并拦截pipeline中图网络构建节点
                        if (TaskTypeEnum.TASK_TYPE_GRAPH.getVal().equals(task.getType())) {
                            throw new DataScienceException(BaseErrorCode.PIPELINE_SNAPSHOT_NOT_SUPPORT_GRAPH_BUILD);
                        }
                        //获取清洗节点，保存部分instance信息，原因是：清洗节点的更改步骤信息在task_instance表里保存
                        if (TaskTypeEnum.TASK_TYPE_CLEAN.getVal().equals(task.getType())) {
                            cleanTypeTaskIds.add(task.getId());
                        }
                        taskStrings.add(JSON.toJSONString(task));
                    }

                    //task_instance信息转json字符串
                    if (!cleanTypeTaskIds.isEmpty()) {
                        List<TaskInstanceDTO> taskInstanceDTOS = taskInstanceMapper.queryByTaskIdsAndOrder(cleanTypeTaskIds);
                        if (taskInstanceDTOS!=null) {
                            for (TaskInstanceDTO taskInstance:taskInstanceDTOS) {
                                taskInstanceStrings.add(JSON.toJSONString(taskInstance));
                            }
                        }
                    }
                } else {
                    //返回当前pipeline没有节点，不能保存快照
                    throw new DataScienceException(BaseErrorCode.PIPELINE_NOT_TASK_EXIST);
                }

                //检查版本快照容量
                int count = pipelineSnapshotMapper.countByPipelineId(vo.getPipelineId());

                //达到或超出容量，删掉最早版本
                if (count >= Constant.PIPELINE_SNAPSHOT_CAPACITY) {
                    deleteOldestSnapshot(vo.getPipelineId());
                }

                //保存到minio下
                String picturePath = vo.getProjectId()+"-"+JwtUtil.getCurrentUserId()+"-"+vo.getPipelineId()+"-"+UUID.randomUUID();
                minioService.putObject(Constant.PIPELINE_SNAPSHOT_MINIO_BUCKET,picturePath,new ByteArrayInputStream(vo.getPicture().getBytes(StandardCharsets.UTF_8)));
                //minioService.putObject(Constant.PIPELINE_SNAPSHOT_MINIO_BUCKET,picturePath,vo.getPicture());

                //添加pipeline_snapshot表信息
                PipelineSnapshotDTO psd = DozerUtil.mapper(vo, PipelineSnapshotDTO.class);
                psd.setName(vo.getPipelineSnapshotName());
                psd.setPicturePath(picturePath);
                psd.setGmtCreator(JwtUtil.getCurrentUserId());
                psd.setGmtModifier(JwtUtil.getCurrentUserId());
                long pipelineSnapshotId = save(psd);

                if (pipelineSnapshotId>0) {
                    List<TaskSnapshotDTO> taskSnapshots = new ArrayList<>();
                    for (String taskString:taskStrings) {
                        TaskSnapshotDTO taskSnapshot = new TaskSnapshotDTO();
                        taskSnapshot.setPipelineSnapshotId(pipelineSnapshotId);
                        taskSnapshot.setPipelineId(vo.getPipelineId());
                        taskSnapshot.setProjectId(vo.getProjectId());
                        taskSnapshot.setTaskJson(taskString);
                        taskSnapshot.setGmtCreator(JwtUtil.getCurrentUserId());
                        taskSnapshot.setGmtModifier(JwtUtil.getCurrentUserId());

                        taskSnapshots.add(taskSnapshot);
                    }

                    //添加task_snapshot表信息
                    taskSnapshotService.batchAdd(taskSnapshots);

                    //添加task_instance_snapshot表信息
                    if (!taskInstanceStrings.isEmpty()) {
                        List<TaskInstanceSnapshotDTO> taskInstanceSnapshots = new ArrayList<>();
                        for (String taskInstanceString:taskInstanceStrings) {
                            TaskInstanceSnapshotDTO taskInstanceSnapshot = new TaskInstanceSnapshotDTO();
                            taskInstanceSnapshot.setPipelineSnapshotId(pipelineSnapshotId);
                            taskInstanceSnapshot.setPipelineId(vo.getPipelineId());
                            taskInstanceSnapshot.setProjectId(vo.getProjectId());
                            taskInstanceSnapshot.setTaskInstanceJson(taskInstanceString);
                            taskInstanceSnapshot.setGmtCreator(JwtUtil.getCurrentUserId());
                            taskInstanceSnapshot.setGmtModifier(JwtUtil.getCurrentUserId());

                            taskInstanceSnapshots.add(taskInstanceSnapshot);
                        }
                        taskInstanceSnapshotService.batchAdd(taskInstanceSnapshots);
                    }

                    return true;
                }
            } catch (Exception e) {
                logger.error(e.getMessage(),e);
                throw e;
            } finally {
                //释放锁
                redisUtil.releaseLock(key);
            }

        } else {
            //pipeline已存在快照操作，请稍后执行此操作
            throw new DataScienceException(BaseErrorCode.PIPELINE_IS_PERFORMING_SNAPSHOT);
        }
        return false;
    }

    /**
     * 保存快照
     * @param pipelineSnapshot
     * @return
     */
    public long save(PipelineSnapshotDTO pipelineSnapshot) {
        pipelineSnapshotMapper.save(pipelineSnapshot);
        return pipelineSnapshot.getId();
    }

    /**
     * 根据id获取快照信息
     * @param id
     * @return
     */
    public PipelineSnapshotDTO queryById(Long id) {
        return pipelineSnapshotMapper.queryById(id);
    }

    /**
     * 获取快照列表
     * @param pipelineId
     * @return
     */
    public List<PipelineSnapshotQueryVO> queryByPipeline(Long pipelineId) {
        List<PipelineSnapshotQueryVO> res = new ArrayList<>();
        List <PipelineSnapshotDTO> dtos = pipelineSnapshotMapper.queryByPipeline(pipelineId);

        for (PipelineSnapshotDTO dto:dtos) {
            String path = dto.getPicturePath();
            PipelineSnapshotQueryVO vo = PipelineSnapshotQueryVO.builder()
                .pipelineSnapshotId(dto.getId())
                .pipelineSnapshotName(dto.getName())
                .pipelineId(dto.getPipelineId())
                .projectId(dto.getProjectId())
                .gmtCreate(dto.getGmtCreate())
                .build();

            if (StringUtils.isNotBlank(path)) {
                InputStream in = null;
                try {
                    in = minioService.getObject(Constant.PIPELINE_SNAPSHOT_MINIO_BUCKET,path);
                    vo.setPicture(FileUtil.inputStream2String(in));
                } catch (Exception e) {
                    logger.error(e.getMessage(),e);
                } finally {
                    if (in!=null) {
                        try {
                            in.close();
                        } catch (IOException e) {
                            logger.error(e.getMessage(),e);
                        }
                    }
                }
            }
            res.add(vo);
        }

        return res;
    }

    /**
     * 判断指定pipeline下快照名是否存在
     * @param pipelineId
     * @param pipelineSnapshotName
     * @return
     */
    public boolean nameExist(Long pipelineId, String pipelineSnapshotName) {
        int res = pipelineSnapshotMapper.countByPipelineAndName(pipelineId,pipelineSnapshotName);
        return res>0?true:false;
    }

    /**
     * 删除pipeline snapshot信息
     * @param ids
     * @return
     */
    public boolean delete(List<Long> ids) {
        int res = pipelineSnapshotMapper.delete(ids);
        return res>0?true:false;
    }

    /**
     * 根据pipeline id 删除信息
     * @param pipelineId
     * @return
     */
    public boolean deleteByPipeline(Long pipelineId) {
        int res = pipelineSnapshotMapper.deleteByPipeline(pipelineId);
        return res>0?true:false;
    }

    /**
     * 根据project id 删除信息
     * @param projectId
     * @return
     */
    public boolean deleteByProject(Long projectId) {
        int res = pipelineSnapshotMapper.deleteByProject(projectId);
        return res>0?true:false;
    }

    /**
     * 检查快照临界值
     * @param pipelineId
     * @return
     */
    public JSONObject checkSnapshotThreshold(Long pipelineId) {
        JSONObject res = new JSONObject();
        int count = pipelineSnapshotMapper.countByPipelineId(pipelineId);
        if (count < Constant.PIPELINE_SNAPSHOT_CAPACITY) {
            res.put("tips","");
            res.put("useful",false);
        } else {
            res.put("tips","版本库已满，是否删除最早版本保存当前版本？");
            res.put("useful",true);
        }
        return res;
    }

    /**
     * 删除快照
     * @param vo
     * @return
     * @throws Exception
     */
    @Transactional
    public Boolean deleteSnapshot(PipelineSnapshotVO vo) throws Exception {
        List<Long> deleteIds = vo.getPipelineSnapshotIds();
        List<PipelineSnapshotDTO> snapshots = pipelineSnapshotMapper.querySnapshotByPipeline(vo.getPipelineId());

        if (snapshots==null||snapshots.isEmpty()) {
            throw new DataScienceException(BaseErrorCode.PIPELINE_SNAPSHOT_EMPTY);
        }
        List<Long> ids = new ArrayList<>();
        for (PipelineSnapshotDTO snapshot:snapshots) {
            ids.add(snapshot.getId());
        }

        if (!ids.containsAll(deleteIds)) {
            //要删除的快照在当前pipeline中不存在
            throw new DataScienceException(BaseErrorCode.PIPELINE_SNAPSHOT_ID_NOT_EXIST);
        }

        //删除pipeline_snapshot表信息
        pipelineSnapshotMapper.delete(vo.getPipelineSnapshotIds());

        //删除task_snapshot表信息
        taskSnapshotService.deleteByPipelineSnapshotId(deleteIds);

        //删除task_instance_snapshot表信息
        taskInstanceSnapshotService.deleteByPipelineSnapshotId(deleteIds);

        //删除minio中截图
        for (PipelineSnapshotDTO snapshot:snapshots) {
            if (deleteIds.contains(snapshot.getId())&&StringUtils.isNotBlank(snapshot.getPicturePath())) {
                minioService.deleteObject(Constant.PIPELINE_SNAPSHOT_MINIO_BUCKET,snapshot.getPicturePath());
            }
        }

        return true;
    }

    /**
     * 恢复快照
     * @param vo
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public Boolean recoverSnapshot(PipelineSnapshotVO vo) {
        String key = "pipeline_"+vo.getPipelineId()+"_snapshot";
        //获取锁
        if (redisUtil.getLock(key,vo.getPipelineId().toString(),120L)) {
            try {
                //获取视图保存的节点信息
                List<TaskSnapshotDTO> tsds = taskSnapshotService.queryByPipelineAndSnapshotId(vo.getPipelineId(),vo.getPipelineSnapshotId());
                if (tsds==null||tsds.isEmpty()) {
                    throw new DataScienceException(BaseErrorCode.PIPELINE_SNAPSHOT_TASK_EMPTY);
                }
                List<TaskDTO> tasks = new ArrayList<>();
                for (TaskSnapshotDTO tsd:tsds) {
                    tasks.add(JSON.parseObject(tsd.getTaskJson(),TaskDTO.class));
                }

                //获取task instance信息
                List<TaskInstanceSnapshotDTO> tisds = taskInstanceSnapshotService.queryByPipelineAndSnapshotId(vo.getPipelineId(),vo.getPipelineSnapshotId());
                List<TaskInstanceDTO> taskInstances = new ArrayList<>();
                if (tisds!=null) {
                    for (TaskInstanceSnapshotDTO tisd:tisds) {
                        taskInstances.add(JSON.parseObject(tisd.getTaskInstanceJson(),TaskInstanceDTO.class));
                    }
                }

                //清空指定pipeline节点相关信息
                List<Long> taskIds = taskMapper.queryIdByPipeline(vo.getPipelineId());
                if (taskIds!=null&&!taskIds.isEmpty()) {
                    taskService.batchDelete(new JSONObject(), taskIds, new ArrayList<>(), vo.getPipelineId());
                }

                //视图中的节点写入pipeline
                taskService.batchSave(tasks);

                //清洗节点的task_instance数据写回
                if (!taskInstances.isEmpty()) {
                    taskInstanceMapper.batchSave(taskInstances);
                }

                return true;
            } catch (Exception e){
                logger.error(e.getMessage(), e);
                throw e;
            } finally {
                //释放锁
                redisUtil.releaseLock(key);
            }
        }
        return false;
    }

    /**
     * 删除最老的快照
     * @param pipelineId
     */
    private void deleteOldestSnapshot(Long pipelineId){
        if (pipelineId==null) {
            return;
        }
        try {
            //找到最老的版本
            PipelineSnapshotDTO  oldestSnapshot = pipelineSnapshotMapper.queryOldestPipelineSnapshot(pipelineId);
            if (oldestSnapshot!=null) {
                //删除pipeline_snapshot表相关信息
                delete(Arrays.asList(oldestSnapshot.getId()));
                //清除task_snapshot表相关信息
                taskSnapshotService.deleteByPipelineSnapshotId(Arrays.asList(oldestSnapshot.getId()));

                //不一定有task instance 快照相关数据，所以不需要返回值判断处理结果
                taskInstanceSnapshotService.deleteByPipelineSnapshotId(Arrays.asList(oldestSnapshot.getId()));

                if (StringUtils.isNotBlank(oldestSnapshot.getPicturePath())) {
                    //删除minio中截图
                    minioService.deleteObject(Constant.PIPELINE_SNAPSHOT_MINIO_BUCKET,oldestSnapshot.getPicturePath());
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
            throw new DataScienceException(BaseErrorCode.PIPELINE_SNAPSHOT_DELETE_EXCEPTION);
        }

    }
}
